跳到主要内容

使用 ETCD 作为注册中心

Etcd 是什么?

Etcd 是一个使用一致性哈希算法 (Raft) 在分布式环境下的 key/value 存储服务(就是 Go 生态下的 Zookeeper)。利用 Etcd 的特性,应用程序可以在集群中共享信息、配置或作服务发现,Etcd 会在集群的各个节点中复制这些数据并保证这些数据始终正确。所以除了 Consul 之外,在 Go 生态中,还可以选择基于 Etcd 作为注册中心

配置环境

使用 Docker Compose 模拟启动一个 3 节点的 etcd 集群。

编辑 docker-compose.yml 文件,参考该 配置

使用 docker-compose up 启动集群之后使用 docker exec 命令登录到任一节点测试 etcd 集群。

# 检查集群是否配置好(下面的命令也可以通过 docker 这样外部执行)
$ docker exec 7efaa50ed64f /bin/sh -c 'etcdctl member list'

daf3fd52e3583ff, started, node3, http://172.16.238.102:2380, http://172.16.238.102:2379, false
422a74f03b622fef, started, node1, http://172.16.238.100:2380, http://172.16.238.100:2379, false
ed635d2a2dbef43d, started, node2, http://172.16.238.101:2380, http://172.16.238.101:2379, false

如果要进行键值的存储和读取的话,对应的交互指令如下

# 推送 key-value
$ etcdctl put hello world
OK

# 取得数据
$ etcdctl get hello
hello
world

# 可以只显示 value
$ etcdctl get hello --print-value-only
world

# 删除 key
$ etcdctl del hello

还可以通过新开一个终端窗口通过 watch 指令监听上述键值变更:

$ etcdctl watch hello

20220429110016

基于 ETCD 的服务发现原理

当我们的服务有一定规模之后,因为一个服务可能会被很多个服务依赖,我们就需要能够动态增减节点,而无需修改很多的调用方配置并重启。

整体的流程如下:

在 Go 中使用

这里使用官方给的 客户端

go get go.etcd.io/etcd/client/v3

put 命令用来设置键值对数据,get 命令用来根据 key 获取值。

import (
"context"
"testing"
"time"

clientv3 "go.etcd.io/etcd/client/v3"
)

func TestConnet(t *testing.T) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"172.16.238.101:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
t.Errorf("connect to etcd failed, err:%v\n", err)
return
}

t.Log("connect to etcd success")
defer cli.Close()

// put
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "q1mi", "dsb")
cancel()
if err != nil {
t.Errorf("put to etcd failed, err:%v\n", err)
return
}

// get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "q1mi")
cancel()
if err != nil {
t.Errorf("get from etcd failed, err:%v\n", err)
return
}

for _, ev := range resp.Kvs {
t.Logf("%s:%s\n", ev.Key, ev.Value)
}
}

自己编写服务发现

服务发现的第一个工作就是从 ETCD 里面读取相应的服务配置

创建客户端

创建一个客户端

func NewEtcd(urls string) (*clientv3.Client, error) {
cfg := clientv3.Config{
Endpoints: strings.Split(urls, ","),
DialTimeout: 10 * time.Second,
}

username, password := config.GetEtcdUsername(), config.GetEtcdPassword()
if username != "" && password != "" {
cfg.Username = username
cfg.Password = password
}

cli, err := clientv3.New(cfg)
if err != nil {
return nil, fmt.Errorf("client.New err: %v", err)
}

return cli, nil
}

服务配置的读写

原理就是把服务配置通过 json 序列化的方式读写

const (
ROOT = "/"
SERVICE = "service"
DEFAULT_CLUSTER = "default"
)

type Config struct {
ServiceVersion string `json:"service_version"`
ServicePort string `json:"service_port"`
HttpPort string `json:"http_port"`
IsSsl bool `json:"is_ssl"`
}

type ServiceConfig struct {
EtcdServerUrl string
ServerName string
Config
}

func NewServiceConfig(etcdServerUrl, serverName string) *ServiceConfig {
return &ServiceConfig{
EtcdServerUrl: etcdServerUrl,
ServerName: serverName,
}
}


func (c *ServiceConfig) GetKeyName(serverName string) string {
return ROOT + SERVICE + "." + serverName + "." + DEFAULT_CLUSTER
}

读取配置

func (c *ServiceConfig) GetConfig() (*Config, error) {
cli, err := util.NewEtcd(c.EtcdServerUrl)
if err != nil {
return nil, fmt.Errorf("generating etcd client failed to %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

key := c.GetKeyName(c.ServerName)
serviceInfo, err := cli.Get(ctx, key)
if err != nil {
return nil, fmt.Errorf("etcd client Get key %s failed to %v", key, err)
}

var config Config
if len(serviceInfo.Kvs) > 0 {
err := json.Unmarshal(serviceInfo.Kvs[0].Value, &config)
if err != nil {
return nil, fmt.Errorf("json.Unmarshal err: %v", err)
}
}

if config.ServicePort == "" {
return nil, fmt.Errorf("servicePort is empty, key: %s", key)
}

return &config, nil
}

写入配置

func (c *ServiceConfig) WriteConfig(cf Config) error {
cli, err := util.NewEtcd(c.EtcdServerUrl)
if err != nil {
return fmt.Errorf("generating etcd client failed to %v", err)
}
key := c.GetKeyName(c.ServerName)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

cfJson, err := marshalToString(&cf)
if err != nil {
return fmt.Errorf("json.MarshalToString err: %v", err)
}

_, err = cli.Put(ctx, key, cfJson)
if err != nil {
return fmt.Errorf("cli.Put err: %v", err)
}

return nil
}

func marshalToString(v interface{}) (string, error) {
b, err := json.Marshal(v)
return string(b), err
}

测试使用

package etcdconfig

import (
"context"
"reflect"
"testing"

"alsritter.icu/rabbit/internal/util"
)

func TestServiceConfig_GetConfig(t *testing.T) {
type fields struct {
EtcdServerUrl string
ServerName string
}
tests := []struct {
name string
fields fields
want *Config
wantErr bool
}{
{
"test read service config info from etcd",
fields{
EtcdServerUrl: "172.16.238.101:2379",
ServerName: "rabbit-test-read-server",
},
&Config{
ServiceVersion: "1.0",
ServicePort: "6060",
HttpPort: "7070",
IsSsl: true,
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &ServiceConfig{
EtcdServerUrl: tt.fields.EtcdServerUrl,
ServerName: tt.fields.ServerName,
}

cli, _ := util.NewEtcd(c.EtcdServerUrl)
key := c.GetKeyName(c.ServerName)
cli.Put(context.Background(), key, `{"service_version":"1.0","service_port":"6060","http_port":"7070","is_ssl":true}`)

got, err := c.GetConfig()
if (err != nil) != tt.wantErr {
t.Errorf("ServiceConfig.GetConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}

if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ServiceConfig.GetConfig() = %v, want %v", got, tt.want)
}
})
}
}

func TestServiceConfig_WriteConfig(t *testing.T) {
type fields struct {
EtcdServerUrl string
ServerName string
}
tests := []struct {
name string
fields fields
config Config
wantErr bool
}{
{
"test write service config info to etcd",
fields{
EtcdServerUrl: "172.16.238.101:2379",
ServerName: "rabbit-test-write-server",
},
Config{
ServiceVersion: "1.0",
ServicePort: "8080",
HttpPort: "9090",
IsSsl: true,
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &ServiceConfig{
EtcdServerUrl: tt.fields.EtcdServerUrl,
ServerName: tt.fields.ServerName,
}
cli, _ := util.NewEtcd(c.EtcdServerUrl)
key := c.GetKeyName(c.ServerName)
cli.Delete(context.Background(), key)

if err := c.WriteConfig(tt.config); (err != nil) != tt.wantErr {
t.Errorf("ServiceConfig.WriteConfig() error = %v, wantErr %v", err, tt.wantErr)
}

serviceInfo, _ := cli.Get(context.Background(), key)
if string(serviceInfo.Kvs[0].Value) != `{"service_version":"1.0","service_port":"8080","http_port":"9090","is_ssl":true}` {
t.Errorf("write failed")
}
})
}
}

References

深度解析 Raft 分布式一致性协议